package com.example.mqdemo.basicconsume;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class ReceiveLogsTopicConsumerStyle {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String queueName = channel.queueDeclare("ckl-test-consume",
                false, false, false, null).getQueue();

        channel.queueBind(queueName, EXCHANGE_NAME, "*.ckl");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleConsumeOk(String consumerTag) {
                super.handleConsumeOk(consumerTag);
            }

            @Override
            public void handleCancelOk(String consumerTag) {
                super.handleCancelOk(consumerTag);
            }

            @Override
            public void handleCancel(String consumerTag) throws IOException {
                System.out.println("consumer cancelled:" + consumerTag);
                super.handleCancel(consumerTag);

                System.out.println(" reconnect ...");
                String queueName = channel.queueDeclare("ckl-test-consume",
                        false, false, false, null).getQueue();

                channel.queueBind(queueName, EXCHANGE_NAME, "*.ckl");


                channel.basicConsume(queueName, true,
                        "ckl-consumer-tag", this);

            }

            /**
             * 在webui上可以强制关闭connection，关闭后，就会进入这里
             * 而且，会触发恢复。
             *
             * @param consumerTag
             * @param sig
             */
            @Override
            public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
                super.handleShutdownSignal(consumerTag, sig);
                System.out.println("consumer handleShutdownSignal:" + consumerTag);
                System.out.println(sig);
            }

            @Override
            public void handleRecoverOk(String consumerTag) {
                super.handleRecoverOk(consumerTag);
            }

            /**
             * com.rabbitmq.client.Delivery封装了如下几个字段:
             *
             *     private final Envelope _envelope;
             *     private final AMQP.BasicProperties _properties;
             *     private final byte[] _body;
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" +
                        envelope.getRoutingKey() + "':'" + message + "'");
            }
        };

        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-cancel-on-ha-failover", true);
//        channel.basicConsume(queueName, true,
//                "ckl-consumer-tag", defaultConsumer);
        channel.basicConsume(queueName, true,
                args, defaultConsumer);

//        channel.basicCancel("ckl-consumer-tag");
    }
}
